package com.i72.basic;

import com.i72.freeway.StringHelper;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author jiangj
 * @version 1.0.0
 * @ClassName SOAServiceCenter.java
 * @Description TODO
 * @createTime 2021年12月28日 17:51:00
 */
@Slf4j
public class SOAServiceCenter implements ApplicationContextAware {

    @Value("${zk.discovery.server-addr}")
    private String zkAddress;

    @Value("${spring.application.name}")
    private String appName;

    @Value("${zk.discovery.server-type:}")
    private String serverType;

    public static String applicationName = "";

    public static String localIP = "";

    public static final String SUBSCRIBE_LIST = "SUBSCRIBE-LIST";

    private static final String      K8S_SERVICE_IP        = "K8S_SERVICE_IP";
    private static final String      SOAServiceNodeType    = "SOAServiceNodeType";
    private static final Set<String> subscribeServiceNames = new HashSet<>();
    public static final String            SOA_SERVICE_NODE  = "/soaservices";
    private static ZkClient zkClient       = null;
    private static Map<String, SOAServiceAddress> serviceAddress = new HashMap<>();
    private static Map<String, String>            serviceConfig;
    private static SOAServiceCenter soaServiceCenter = new SOAServiceCenter();
    private static SOPSOAListener sopSOAListener = soaServiceCenter.new SOPSOAListener();
    private static SOPConfigListener sopConfigListener = soaServiceCenter.new SOPConfigListener();
    private static SOPOtherConfigListener sopOtherConfigListener = soaServiceCenter.new SOPOtherConfigListener();
    private static Set<String> registerNodeNames = new HashSet<>();
    private static final String                           SVJIA_ZK_SERVER         = "SVJIA_ZK_SERVER";
    private static String customIp;
    private static String podAddress;
    private static String k8sServiceIp;
    private static Object LOCK = new Object();
    private static String                        commonNode        = "/ihome-common";
    private static Map<String, String>           settings          = new HashMap<>();
    private static Map<String, String>           commonSettings    = new HashMap<>();
    private static Map<String,Map<String,String>> otherSettings = new ConcurrentHashMap<>();
    private static final Map<String, Map<String, String>> settingMaps             = new HashMap<>();
    private static ApplicationContext applicationContext;
    private static final int                              DEFAULT_SESSION_TIMEOUT = 30 * 1000;
    private final static String K8S_AND_POD = "K8S&POD";

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @PostConstruct
    private void getZkClient(){
        if (zkClient == null) {
            synchronized (LOCK) {
                if (zkClient == null) {
                    try {
                        Map<String, String> env = System.getenv();
                        String address = "";
                        if (!StringHelper.isNullOrEmpty(env.get(SVJIA_ZK_SERVER))) {
                            address = env.get(SVJIA_ZK_SERVER);
                        }

                        if(!StringUtils.isEmpty(zkAddress)){
                            address = zkAddress;
                        }

                        zkClient = new ZkClient(address, DEFAULT_SESSION_TIMEOUT, 5000, new SOPZkSerializer());
                    }catch (Exception e){
                        String v = "";
                    }
                }
            }
        }

        //appName = "open-"+appName;
        applicationName = appName;
        localIP = TraceIdHelper.getLocalIPAddress();
        init();
        initSoa();
        initConfig();
        registerServiceAddress();

        /*
        if(applicationContext!=null){
            applicationContext.publishEvent(new ZKEvent(this));
        }*/


        //return zkClient;
    }

    /**
     * 初始化 SOAService
     * @author Chenjw
     * @since 2018/6/13
     **/
    public  void initSoa() {

        Map<String, String> env = System.getenv();
        if (!StringHelper.isNullOrEmpty(env.get(K8S_SERVICE_IP))) {
            k8sServiceIp = env.get(K8S_SERVICE_IP);
            log.info("get k8s service ip in environment variable:{}", k8sServiceIp);
        }

        if (zkClient.exists(SOA_SERVICE_NODE)) {
            zkClient.subscribeChildChanges(SOA_SERVICE_NODE, sopSOAListener);
            List<String> yardList = zkClient.getChildren(SOA_SERVICE_NODE);
            initSoaChildren(yardList);
        } else {
            log.error("soaservices node was not found!");
        }
        zkClient.subscribeStateChanges(sopSOAListener);
    }

    private void initCommon(){
        if (!zkClient.exists(commonNode)) {
            log.warn("{} node not found!", commonNode);
        } else {
            List<String> yardList = zkClient.getChildren(commonNode);
            for (String key : yardList) {
                String value = StringHelper.trim(zkClient.readData(commonNode + "/" + key));
                log.info("read common setting from zk, key:[{}], value:[{}]", key, value);
                commonSettings.put(key.trim(), value);
                settingMaps.put(getZKPath(commonNode), commonSettings);
                zkClient.subscribeDataChanges(getZKPath(commonNode, key), sopConfigListener);
            }
            zkClient.subscribeChildChanges(commonNode, sopConfigListener);
        }
    }

    public void initConfig(){
        //zkClient.subscribeStateChanges(swjConfigListener);
        initCommon();


        String rootPathName = "/" + appName;
        if (!zkClient.exists(rootPathName)) {
            log.warn("node {} not found!", rootPathName);
        } else {
            List<String> yardList = zkClient.getChildren(rootPathName);
            for (String key : yardList) {
                String value = StringHelper.trim(zkClient.readData(contactKey(key)));
                log.info("read setting from zk, key:[{}], value:[{}]", key, value);
                settings.put(key.trim(), value);
                zkClient.subscribeDataChanges(contactKey(key), sopConfigListener);
            }
            zkClient.subscribeChildChanges(rootPathName, sopConfigListener);
        }
        settingMaps.put(rootPathName, settings);
    }

    private  String contactKey(String key) {
        return "/" + appName + "/" + key;
    }

    public String getConfig(String key) {
        if (settings.containsKey(key)) {
            return settings.get(key);
        }
        return commonSettings.get(key);
    }

    public String getConfig(String serviceName,String key){
        try {
            if (!otherSettings.containsKey(serviceName)) {

                //String rootPathName = "/" + appName;
                String rootPathName = "/" + serviceName;
                if(!zkClient.exists(rootPathName)){
                    return null;
                }
                Map<String, String> map = new HashMap<>();
                otherSettings.put(serviceName, map);
                List<String> yardList = zkClient.getChildren(rootPathName);

                for (String key2 : yardList) {

                    String key3 = rootPathName + "/" + key2;

                    String value = StringHelper.trim(zkClient.readData(key3));
                    map.put(key2.trim(), value);

                    //监听每个配置值
                    zkClient.subscribeDataChanges(key3, sopOtherConfigListener);
                }
                zkClient.subscribeChildChanges(rootPathName,sopOtherConfigListener);

                log.info("load {} config",serviceName);
            }

            return otherSettings.get(serviceName).get(key);
        }catch (Exception e){
            log.error("{}:{}获取错误",serviceName,key);
            return null;
        }
    }

    public void updateConfig(String serviceName,String key,String value){
        String dataPath=String.format("/%s/%s",serviceName,key);
        updateConfig(dataPath,value);
    }
    private void updateConfig(String dataPath, Object data) {
        String[] array = getParentAndChild(dataPath);
        if (array == null) return;

        Map<String, String> nodeSettings = settingMaps.get(array[0]);
        nodeSettings.put(array[1], StringHelper.trim(data));
        log.info("zkNode:{} value change, new value:{}", dataPath, data);
        /*
        SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
        if (swjConfigHandler != null) {
            swjConfigHandler.updatePostHandle(dataPath, StringHelper.trim(data));
        }*/
        //settings.put(dataPath,data);
    }
    public void addConfig(String serviceName,String key,String value){
        String dataPath=String.format("/%s/%s",serviceName,key);
        //addConfig(dataPath,value);
    }
    private void addConfig(String parentPath, List<String> keys) {
        Map<String, String> nodeSettings = settingMaps.computeIfAbsent(parentPath, k -> new HashMap<>());
        for (String key : keys) {
            if (nodeSettings.containsKey(key.trim())) {
                continue;
            }
            String dataPath = parentPath + "/" + key;
            String value = StringHelper.trim(zkClient.readData(dataPath));
            nodeSettings.put(key, value);
            zkClient.subscribeDataChanges(dataPath, sopConfigListener);
            log.info("create zkNode:{}, value:{}, subscribe data change", dataPath, value);
            /*
            SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
            if (swjConfigHandler != null) {
                swjConfigHandler.addPostHandler(dataPath, value);
            }*/
        }
    }

    private  void removeConfig(String dataPath) {
        String[] array = getParentAndChild(dataPath);
        if (array == null) return;

        Map<String, String> nodeSettings = settingMaps.get(array[0]);
        if (nodeSettings.containsKey(array[1])) {
            nodeSettings.remove(array[1]);
            zkClient.unsubscribeDataChanges(dataPath, sopConfigListener);
            log.info("remove zkNode:{}, unsubscribe data change", dataPath);
            /*
            SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
            if (swjConfigHandler != null) {
                swjConfigHandler.removePostHandle(dataPath);
            }*/
        }
    }


    /**
     * 获取服务可用IP（轮询）
     * @param serviceName 服务名称
     * @return 可用IP，如果没有可用IP则返回null
     * @author liuhf
     * @since 2018/6/12
     */
    public  String getServiceAddress(String serviceName) {

        SystemStageEnum stage = getStage();
        if (stage == SystemStageEnum.PRE || stage == SystemStageEnum.PROD) {
            return getServiceAddress(serviceName, null);
        }
        if (serviceConfig != null && serviceConfig.containsKey(serviceName)) {
            String config = serviceConfig.get(serviceName);
            SystemStageEnum systemStageEnum = SystemStageEnum.getByCode(config);
            if (systemStageEnum == null) {
                return config;
            }
            if (systemStageEnum == SystemStageEnum.CUSTOM) {
                return getServiceAddress(serviceName, NodeType.CUSTOM);
            }
            return getServiceAddress(serviceName, null);

        } else {
            return getServiceAddress(serviceName, null);
        }

    }

    /**
     * 获取所属环境的服务可用IP（轮询）
     * @param serviceName 服务名称
     * @param type 节点类型
     * @return 可用IP，如果没有可用IP则返回null
     */
    public  String getServiceAddress(String serviceName, NodeType type) {
        // 不存在该服务节点，则初始化并且订阅
        if (!existService(serviceName)) {
            if (zkClient.exists(SOA_SERVICE_NODE)) {
                List<String> yardList = zkClient.getChildren(SOA_SERVICE_NODE);
                if (!yardList.contains(serviceName)) {
                    return null;
                }
                addSubscribeServiceName(serviceName);
                initSoaChildren(yardList);
            } else {
                log.error("soaservices node was not found!");
                return null;
            }
        }

        if (type == null) {
            // 优先取podIp节点，再取serviceIp节点
            SOAServiceAddress soaServiceAddress = serviceAddress.get(serviceName);
            if (soaServiceAddress.exist(NodeType.POD)) {
                return serviceAddress.get(serviceName).getNode(NodeType.POD);
            }

            return serviceAddress.get(serviceName).getNode(NodeType.K8S);
        }
        return serviceAddress.get(serviceName).getNode(type);
    }

    /**
     * 注册服务到服务中心
     */
    public  void registerServiceAddress() {
        if(registerNodeNames!=null && registerNodeNames.size()>0){
        //if (CollectionUtils.isNotEmpty(registerNodeNames)) {
            for (String appName : registerNodeNames) {
                registerServiceAddress(appName);
            }
        } else {
            registerServiceAddress(appName);
        }
    }

    /**
     * 注册服务到服务中心
     */
    public  void registerServiceAddress(List<String> appNames) {
        registerNodeNames.addAll(appNames);
        for (String appName : registerNodeNames) {
            registerServiceAddress(appName);
        }
    }

    /**
     * 注册服务到服务中心
     * @author liuhf
     * @since 2018/8/4
     */
    public  void registerServiceAddress(String appName) {
        registerNodeNames.add(appName);
        SystemStageEnum stage = getStage();
        createPersistentZKNode(getZKPath(SOA_SERVICE_NODE, appName), null);
        // 未定义系统环境
        if (stage.name().equalsIgnoreCase(SystemStageEnum.CUSTOM.toString())) {

            NodeProperties nodeProperties = null;
            if(!StringUtils.isEmpty(serverType)){
                nodeProperties = new NodeProperties(NodeType.valueOf(serverType));
            }



            createEphemeralZKNode(getZKPath(SOA_SERVICE_NODE, appName, customIp), nodeProperties);
            //createEphemeralZKNode(getZKPath(SOA_SERVICE_NODE, appName, customIp), null);
        } else {
            String nodeType = getConfig(SOAServiceNodeType);
            if (K8S_AND_POD.equalsIgnoreCase(nodeType)) {
                registerK8SNode(appName);
                registerPodNode(appName);
            } else if (NodeType.K8S.toString().equalsIgnoreCase(nodeType)) {
                // 注册serviceIp节点
                registerK8SNode(appName);
            } else {
                // 注册pod节点
                registerPodNode(appName);
            }
        }
    }

    private  void registerK8SNode(String appName) {
        if(StringUtils.isEmpty(k8sServiceIp)){
        //if (StringUtils.isBlank(k8sServiceIp)) {
            throw new RuntimeException("Environment Variable [K8S_SERVICE_IP] is not exist");
        }
        String node = getZKPath(SOA_SERVICE_NODE, appName, k8sServiceIp);
        createPersistentZKNode(node, new NodeProperties(NodeType.K8S).toString());
        node = getZKPath(node, podAddress);
        createEphemeralZKNode(node, null);
        log.debug("register pod node {} into serviceIp {}", podAddress, k8sServiceIp);
    }

    private  void registerPodNode(String appName) {
        createEphemeralZKNode(getZKPath(SOA_SERVICE_NODE, appName, podAddress), new NodeProperties(NodeType.POD).toString());
        log.debug("register pod node {}", podAddress);
    }

    public  void reloadSoaService(String parentPath, List<String> children) {
        if (parentPath.equals(SOA_SERVICE_NODE)) {
            if(children!=null && children.size()>0){
            //if (!ListHelper.isNullOrEmpty(children)) {
                initSoaChildren(children);
            } else {
                reset();
            }
        } else {
            String[] temp = parentPath.split("/");
            String key = temp[2];
            initSoaChildrenInner(key);
        }
    }

    public  void removeSoaNode(String serviceName, NodeType type, String ip) {
        SOAServiceAddress service = serviceAddress.get(serviceName);
        if (!service.exist(type, ip)) {
            return;
        }
        service.lock();
        try {
            service.remove(type, ip);
        } catch (Exception e) {
            log.error("remove service [{}-{}-{}] error", serviceName, type, ip, e);
        }
        service.unLock();
        log.info("remove service-ip:[{}] from serviceName:[{}], current serviceIps:[{}]", ip, serviceName, service.toString());
    }

    public  void addSoaNode(String serviceName, NodeType type, String ip) {
        SOAServiceAddress service = serviceAddress.get(serviceName);
        if (service.exist(type, ip)) {
            return;
        }
        service.lock();
        try {
            service.add(type, ip);
        } catch (Exception e) {
            log.error("add service [{}-{}-{}] error", serviceName, type, ip, e);
        }
        service.unLock();
        log.info("add service-ip:[{}] from serviceName:[{}], current serviceIps:[{}]", ip, serviceName, service.toString());
    }

    @Override
    public  void setApplicationContext(ApplicationContext applicationContext) {
        SOAServiceCenter.applicationContext = applicationContext;
    }



    public  void addSubscribeServiceName(String serviceName) {
        subscribeServiceNames.add(serviceName);
    }

    public  Map<String, SOAServiceAddress> getServiceAddresses() {
        return Collections.unmodifiableMap(serviceAddress);
    }

    /**
     * 重置所有SOA服务提供者信息
     * @author liuhf
     * @since 2018/8/4
     */
    private  void reset() {
        for (String key : serviceAddress.keySet()) {
            resetService(key, null);
        }
        log.info("service center has been clear");
    }

    /**
     * 判断服务中心是否已存在该服务
     * @param serviceName 服务名称
     * @return true存在，false不存在
     * @author liuhf
     * @since 2018/8/4
     */
    private  boolean existService(String serviceName) {
        return serviceAddress.containsKey(serviceName);
    }

    /**
     * 重置指定服务的可用节点列表
     * @param serviceName 服务名称
     * @param soaServiceNodes 服务节点集合
     */
    private  void resetService(String serviceName, Map<String, List<SOAServiceNode>> soaServiceNodes) {
        if (!existService(serviceName)) {
            serviceAddress.put(serviceName, new SOAServiceAddress());
            //if (MapUtils.isEmpty(soaServiceNodes)) return;
            if(soaServiceNodes==null || soaServiceNodes.keySet().size()<=0){
                return;
            }
        }
        SOAServiceAddress service = serviceAddress.get(serviceName);
        service.lock();
        String oldService = service.toString();
        try {
            service.clear();
            service.setNodeListMap(soaServiceNodes);
        } catch (Exception e) {
            log.error("reset service [{}] error", serviceName, e);
        }
        service.unLock();

        // 节点内容无变化时不打印节点变化日志
        if (oldService.equals(service.toString())) return;
        //log.info("get service-ip map from zk, serviceName:[{}], serviceIp:[{}]", serviceName, service.toString());
    }



    private  void initSoaChildren(List<String> children) {
        for (String yard : children) {
            /*
            if (!"ALL".equalsIgnoreCase(SwjConfig.get(SUBSCRIBE_LIST)) && !subscribeServiceNames.contains(yard)) {
                continue;
            }*/
            if(existService(yard)){
            //if (SOAServiceCenter.existService(yard)) {
                continue;
            }
            /*
            threadPoolTaskExecutor.execute(()->{
                getConfig(yard,"");
            });*/

            zkClient.subscribeChildChanges(getZKPath(SOA_SERVICE_NODE, yard), sopSOAListener);
            initSoaChildrenInner(yard);
            log.debug("init and listener soa service: {}", yard);
        }
    }

    private  void initSoaChildrenInner(String appName) {
        List<String> nodes = zkClient.getChildren(getZKPath(SOA_SERVICE_NODE, appName));
        Map<String, List<SOAServiceNode>> soaServiceNodes = new HashMap<>();
        List<SOAServiceNode> serviceIpList = new ArrayList<>();
        List<SOAServiceNode> podIpList = new ArrayList<>();
        List<SOAServiceNode> customList = new ArrayList<>();
        if(nodes!=null && nodes.size()>0){
        //if (!ListHelper.isNullOrEmpty(nodes)) {
            for (String node : nodes) {
                String path = getZKPath(SOA_SERVICE_NODE, appName, node);
                if (!zkClient.exists(path)) {
                    continue;
                }
                String value = zkClient.readData(path);
                // 判断节点是否可用
                NodeProperties nodeProperties = new NodeProperties().parse(value);
                if (!nodeProperties.isEnabled()) {
                    continue;
                }

                SOAServiceNode soaServiceNode = new SOAServiceNode(node, nodeProperties);
                // k8s service ip节点 检查是否含有pod 有则添加
                if (NodeType.K8S == nodeProperties.getType()) {
                    zkClient.subscribeChildChanges(path, sopSOAListener);
                    zkClient.subscribeDataChanges(path, sopSOAListener);
                    List list = zkClient.getChildren(path);
                    if((list!=null && list.size()>0)&& !serviceIpList.contains(soaServiceNode)){
                    //if (!ListHelper.isNullOrEmpty(list) && !serviceIpList.contains(soaServiceNode)) {
                        serviceIpList.add(soaServiceNode);
                        log.debug("find soa service k8sServiceIp node: {}", path);
                    }
                }
                // pod节点
                else if (NodeType.POD == nodeProperties.getType()) {
                    zkClient.subscribeDataChanges(path, sopSOAListener);
                    if (!podIpList.contains(soaServiceNode)) {
                        podIpList.add(soaServiceNode);
                        log.debug("find soa service podIp node: {}", path);
                    }
                }
                // 自定义节点
                else {
                    if (!customList.contains(soaServiceNode)) {
                        customList.add(soaServiceNode);
                        log.debug("find soa service customIp node: {}", path);
                    }
                }
            }
        }

        /*
        if (!ListHelper.isNullOrEmpty(serviceIpList)) soaServiceNodes.put(NodeType.K8S.toString(), serviceIpList);
        if (!ListHelper.isNullOrEmpty(podIpList)) soaServiceNodes.put(NodeType.POD.toString(), podIpList);
        if (!ListHelper.isNullOrEmpty(customList)) soaServiceNodes.put(NodeType.CUSTOM.toString(), customList);*/

        if(serviceIpList!=null && serviceIpList.size()>0){
            soaServiceNodes.put(NodeType.K8S.toString(), serviceIpList);
        }

        if(podIpList!=null && podIpList.size()>0){
            soaServiceNodes.put(NodeType.POD.toString(), podIpList);
        }

        if(customList!=null && customList.size()>0){
            soaServiceNodes.put(NodeType.CUSTOM.toString(), customList);
        }

        resetService(appName, soaServiceNodes);
    }


    /*
    @Autowired
    public void init(SOAServiceProperties properties) {
        serviceConfig = properties.getServiceConfig();

        ConfigurableEnvironment env = applicationContext.getEnvironment();
        String port = StringUtils.defaultIfBlank(properties.getPort(), StringUtils.defaultIfBlank(env.getProperty("server.port"), "8080"));
        podAddress = Constant.LOCAL_IP + ":" + port;
        customIp = Optional.ofNullable(properties.getLocalIp()).orElse(podAddress);
    }*/

    public void init() {
        //serviceConfig = properties.getServiceConfig();

        StandardEnvironment env = (StandardEnvironment) applicationContext.getEnvironment();
        //String port = StringUtils.defaultIfBlank(properties.getPort(), StringUtils.defaultIfBlank(env.getProperty("server.port"), "8080"));
        String port = env.getProperty("server.port");
        podAddress = localIP + ":" + port;
        //customIp = Optional.ofNullable(properties.getLocalIp()).orElse(podAddress);
        customIp = podAddress;
    }

    private static String[] getParentAndChild(String dataPath) {
        // 单节点不作处理
        if (dataPath.indexOf("/") == dataPath.lastIndexOf("/")) {
            return null;
        }

        int last = dataPath.lastIndexOf("/");
        return new String[] { dataPath.substring(0, last), dataPath.substring(last + 1) };
    }

    private static String getZKPath(String... nodes) {
        StringBuilder result = new StringBuilder();
        for (String node : nodes) {
            if (node.startsWith("/")) result.append(node);
            else result.append("/").append(node);
        }
        return result.toString();
    }
    private SystemStageEnum getStage(){
        Map<String, String> env = System.getenv();
        SystemStageEnum stage = null;
        String K8S_ENV_STAGE = "STAGE";
        if (!StringHelper.isNullOrEmpty(env.get(K8S_ENV_STAGE))) {
            try {
                stage = SystemStageEnum.valueOf(env.get(K8S_ENV_STAGE).toUpperCase());
            } catch (Exception e) {
                throw new IllegalArgumentException("Unknown system stage:" + env.get(K8S_ENV_STAGE));
            }
        } else {
            stage = SystemStageEnum.CUSTOM;
        }
        return stage;

    }

    public static void createPersistentZKNode(String path, Object data) {
        if (!zkClient.exists(path)) {
            zkClient.createPersistent(path, data);
            log.info("create persistent zk path:{}, value:{}", path, data);
        }
    }
    public static void createEphemeralZKNode(String path, Object data) {
        if (!zkClient.exists(path)) {
            zkClient.createEphemeral(path, data);
            log.info("create ephemeral zk path:{}, value:{}", path, data);
        } else {
            zkClient.delete(path);
            zkClient.createEphemeral(path, data);
            log.info("reCreate ephemeral zk path:{}, value:{}", path, data);
        }
    }
    public static void deleteEphemeralZKNode(String path) {
        if (zkClient.exists(path)) {
            zkClient.delete(path);
            log.info("delete ephemeral zk path:{}", path);
        }
    }



    class SOPZkSerializer implements ZkSerializer {
        @Override
        public byte[] serialize(Object data) throws ZkMarshallingError {
            try {
                return String.valueOf(data).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        }

        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            try {
                return new String(bytes, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    enum NodeType {

        K8S,
        POD,
        CUSTOM

    }

    enum SystemStageEnum {

        // 自定义环境
        CUSTOM,
        // 开发环境
        DEV,
        // 测试环境
        TEST,
        // 预发布环境
        PRE,
        // 生产环境
        PROD;

        public static SystemStageEnum getByCode(String code) {
            try {
                return SystemStageEnum.valueOf(code);
            } catch (IllegalArgumentException e) {
                return null;
            }
        }

    }

    @Getter
    @Setter
    public class NodeProperties {

        private static final String SPLIT = "&";

        private NodeType type = NodeType.CUSTOM;

        private boolean enabled = true;

        public NodeProperties() {
        }

        public NodeProperties(NodeType type) {
            this.type = type;
        }

        public NodeProperties parse(String value) {
            NodeProperties nodeProperties = new NodeProperties();
            if(!StringUtils.isEmpty(value)){
            //if (StringUtils.isNotBlank(value)) {
                String[] array = value.split(SPLIT);
                // 长度为1，说明是旧的数据格式，只存了节点类型
                if (array.length == 1) {
                    nodeProperties.setType(NodeType.valueOf(value.toUpperCase()));
                    return nodeProperties;
                }
                for (String unit : array) {
                    String[] keyValue = unit.split("=");
                    assign(nodeProperties, keyValue[0], keyValue[1]);
                }
            }
            return nodeProperties;
        }

        private void assign(NodeProperties nodeProperties, String key, String value) {
            switch (key) {
                case "type":
                    nodeProperties.setType(NodeType.valueOf(value.toUpperCase()));
                    break;
                case "enabled":
                    nodeProperties.setEnabled(Boolean.parseBoolean(value));
                    break;
                default:
                    throw new IllegalStateException("unknown node property: " + key);
            }
        }

        @Override
        public String toString() {
            /*
            if (!Boolean.parseBoolean(SwjConfig.get("multiNodeValue"))) {
                return type.toString();
            }*/
            return "type=" + type + SPLIT + "enabled=" + enabled;
        }

    }

    class SOPSOAListener implements IZkDataListener, IZkChildListener, IZkStateListener {

        @Override
        public void handleChildChange(String parentPath, List<String> children) {
            MDC.put("_REQ_ID", TraceIdHelper.generate());
            try {
                if (parentPath.contains(SOA_SERVICE_NODE)) {
                    log.debug("zk reloadSoaService due to childChange from {}", parentPath);
                    reloadSoaService(parentPath, children);
                    applicationContext.publishEvent(new ZKEvent(this));
                }
            } catch (Exception ex) {
                log.error("handleChildChange", ex);
            }
            MDC.clear();

        }

        @Override
        public void handleStateChanged(Watcher.Event.KeeperState state) {
            if (state.equals(Watcher.Event.KeeperState.SyncConnected)) {
                if(true){
                //if (Boolean.parseBoolean(SwjConfig.getIsRegisterApi())) {
                    registerServiceAddress();
                    applicationContext.publishEvent(new ZKEvent(this));
                }
            }
            log.info("zk connect state:{}", state.toString());
        }

        @Override
        public void handleNewSession() {
            log.info("handleNewSession");
        }

        @Override
        public void handleSessionEstablishmentError(Throwable error) {
            log.info("handleSessionEstablishmentError");
        }

        @Override
        public void handleDataChange(String dataPath, Object data) {
            MDC.put("_REQ_ID", TraceIdHelper.generate());
            try {
                if (dataPath.contains(SOA_SERVICE_NODE)) {
                    log.debug("handleDataChange, dataPath: {}, value: {}", dataPath, data);
                    String[] array = dataPath.split("/");
                    // 节点格式：/soaservices/appName/ipNode
                    if (array.length < 4) {
                        log.error("handleDataChange, dataPath [{}] is invalid", dataPath);
                        return;
                    }
                    if (StringHelper.isObjectNullOrEmpty(data)) {
                        return;
                    }
                    NodeProperties nodeProperties = new NodeProperties().parse(data.toString());
                    // k8s节点需要有子节点才添加
                    if (nodeProperties.isEnabled()) {
                        if (NodeType.K8S == nodeProperties.getType() && CollectionUtils.isEmpty(zkClient.getChildren(dataPath))) {
                            removeSoaNode(array[2], nodeProperties.getType(), array[3]);
                            return;
                        }
                        addSoaNode(array[2], nodeProperties.getType(), array[3]);
                    } else {
                        removeSoaNode(array[2], nodeProperties.getType(), array[3]);
                    }
                    applicationContext.publishEvent(new ZKEvent(this));
                }
            } catch (Exception ex) {
                log.error("handleChildChange", ex);
            }
            MDC.clear();
        }

        @Override
        public void handleDataDeleted(String dataPath) {

        }
    }

    class SOPConfigListener implements IZkDataListener, IZkChildListener {

        public void handleDataChange(String dataPath, Object data) {
            try {
                updateConfig(dataPath, data);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleDataChange failure, path: {}, data: {}", dataPath, data, ex);
            }
        }

        public void handleDataDeleted(String dataPath) {
            try {
                removeConfig(dataPath);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleDataDeleted failure, path: {}", dataPath, ex);
            }
        }

        public void handleChildChange(String parentPath, List<String> children) {
            try {
                addConfig(parentPath, children);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleChildChange failure, path: {}, children: {}", parentPath, children, ex);
            }
        }

    }

    class SOPOtherConfigListener implements IZkDataListener, IZkChildListener {

        public void handleDataChange(String dataPath, Object data) {
            try {
                updateConfig2(dataPath, data);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleDataChange failure, path: {}, data: {}", dataPath, data, ex);
            }
        }

        public void handleDataDeleted(String dataPath) {
            try {
                removeConfig2(dataPath);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleDataDeleted failure, path: {}", dataPath, ex);
            }
        }

        public void handleChildChange(String parentPath, List<String> children) {
            try {
                addConfig2(parentPath, children);
            } catch (Exception ex) {
                log.error("ZKConfigListener handleChildChange failure, path: {}, children: {}", parentPath, children, ex);
            }
        }

    }

    private String [] getParentAndChild2(String dataPath){

        // 单节点不作处理
        if (dataPath.indexOf("/") == dataPath.lastIndexOf("/")) {
            return null;
        }

        String [] arr = dataPath.split("/");

        List<String> arrList = new ArrayList<>();
        for(String path:arr){
            if(!StringUtils.isEmpty(path)){
                arrList.add(path);
            }
        }
        String [] arr2 = new String[arrList.size()];
        arrList.toArray(arr2);

        return arr2;

    }

    private   void updateConfig2(String dataPath, Object data) {
        String[] array = getParentAndChild2(dataPath);
        if (array == null) return;

        Map<String, String> nodeSettings = otherSettings.get(array[0]);
        nodeSettings.put(array[1], StringHelper.trim(data));
        log.info("zkNode:{} value change, new value:{}", dataPath, data);
        /*
        SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
        if (swjConfigHandler != null) {
            swjConfigHandler.updatePostHandle(dataPath, StringHelper.trim(data));
        }*/
        //settings.put(dataPath,data);
    }

    private void addConfig2(String parentPath, List<String> keys) {
        String configKey = parentPath.replace("/","");
        Map<String, String> nodeSettings = otherSettings.computeIfAbsent(configKey, k -> new HashMap<>());
        for (String key : keys) {
            if (nodeSettings.containsKey(key.trim())) {
                continue;
            }
            String dataPath = parentPath + "/" + key;
            String value = StringHelper.trim(zkClient.readData(dataPath));
            nodeSettings.put(key, value);
            zkClient.subscribeDataChanges(dataPath, sopOtherConfigListener);
            log.info("create zkNode:{}, value:{}, subscribe data change", dataPath, value);
            /*
            SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
            if (swjConfigHandler != null) {
                swjConfigHandler.addPostHandler(dataPath, value);
            }*/
        }
    }

    private  void removeConfig2(String dataPath) {
        String[] array = getParentAndChild2(dataPath);
        if (array == null) return;

        Map<String, String> nodeSettings = otherSettings.get(array[0]);
        if (nodeSettings.containsKey(array[1])) {
            nodeSettings.remove(array[1]);
            zkClient.unsubscribeDataChanges(dataPath, sopOtherConfigListener);
            log.info("remove zkNode:{}, unsubscribe data change", dataPath);
            /*
            SwjConfigHandler swjConfigHandler = swjConfigHandlers.get(dataPath);
            if (swjConfigHandler != null) {
                swjConfigHandler.removePostHandle(dataPath);
            }*/
        }
    }

}
